Skip to main content

Synchronization of Entire Database into LakeSoul

Since version 2.1.0, LakeSoul has introduced Flink CDC Sink, capable of supporting Table API and SQL (single-table) as well as Stream API (multiple tables within a database). Currently, the supported upstream data sources are MySQL (5.6-8.0), Oracle (11, 12, 19, 21), and Postgresql (10-14). The unified entry for data ingestion into the lake is JdbcCdc.

Main features

In the Stream API, the main functions of LakeSoul Sink are:

  • Support real-time CDC synchronization of thousands of tables (different schemas) in the same Flink job, and different tables will be automatically written to the corresponding table names of LakeSoul
  • For MySQL and PostgreSQL, LakeSoul supports automatic synchronization of schema changes (DDL) to downstream reads, ensuring compatibility between old and new data. Currently, it supports column addition and deletion as well as increasing precision for numeric types.* Support automatic perception of new tables in the upstream database during operation, and automatic table creation in LakeSoul;
  • However, for Oracle, only tables with unchanged schemas are supported for synchronization (adding or deleting columns is not allowed), and it does not support synchronization of new tables.
  • Additionally, MySQL and PostgreSQL support automatic detection of newly created tables in the upstream database during runtime, enabling automatic table creation within LakeSoul.
  • Provide Flink command line startup entry class, support specifying parameters such as database name, table name blacklist, parallelism, etc.;

How to use the command line

It can be downloaded from the LakeSoul Release page: https://github.com/lakesoul-io/LakeSoul/releases/download/v2.5.0/lakesoul-flink-2.5.0-flink-1.17.jar.

The currently supported Flink version is 1.17.

2.1 Add LakeSoul metadata database configuration

Add the following configuration to $FLINK_HOME/conf/flink-conf.yaml:

containerized.master.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.master.env.LAKESOUL_PG_USERNAME: root
containerized.master.env.LAKESOUL_PG_PASSWORD: root
containerized.master.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
containerized.taskmanager.env.LAKESOUL_PG_DRIVER: com.lakesoul.shaded.org.postgresql.Driver
containerized.taskmanager.env.LAKESOUL_PG_USERNAME: root
containerized.taskmanager.env.LAKESOUL_PG_PASSWORD: root
containerized.taskmanager.env.LAKESOUL_PG_URL: jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified

Note that both the master and taskmanager environment variables need to be set.

tip

The connection information, username and password of the Postgres database need to be modified according to the actual deployment.

caution

Note that if you use Session mode to start a job, that is, submit the job to Flink Standalone Cluster as a client, flink run as a client will not read the above configuration, so you need to configure the environment variables separately, namely:

export LAKESOUL_PG_DRIVER=com.lakesoul.shaded.org.postgresql.Driver
export LAKESOUL_PG_URL=jdbc:postgresql://localhost:5432/test_lakesoul_meta?stringtype=unspecified
export LAKESOUL_PG_USERNAME=root
export LAKESOUL_PG_PASSWORD=root

Description of required parameters:

ParameterMeaningValue Description
-cThe task runs the main function entry classorg.apache.flink.lakesoul.entry.MysqlCdc
Main packageTask running jarlakesoul-flink-2.5.0-flink-1.17.jar
--source_db.typesource database typemysql postgres oracle
--source_db.hostThe address of the source database
--source_db.portsource database port
--source_db.usersource database username
--source_db.passwordPassword for source database
--source.parallelismThe parallelism of a single table read task affects the data reading speed. The larger the value, the greater the pressure on source database.The parallelism can be adjusted according to the write QPS of source database
--sink.parallelismThe parallelism of the single-table write task, which is also the number of primary key shards in the LakeSoul table. Affects the landing speed of data entering the lake. The larger the value, the greater the number of small files, which affects the subsequent read performance; the smaller the value, the greater the pressure on the write task, and the greater the possibility of data skew. It can be adjusted according to the data volume of the largest table. It is generally recommended that a degree of parallelism (primary key sharding) manage no more than 10 million rows of data.
--warehouse_pathData storage path prefix (cluster prefix is ​​required for hdfs)LakeSoul will write the corresponding table data to the ${warehouse_path}/database_name/table_name/ directory
--flink.savepointFlink savepoint path (cluster prefix is ​​required for hdfs)
--flink.checkpointFlink checkpoint path (cluster prefix is ​​required for hdfs)

Other Flink parameters, such as job manager, task manager CPU, memory, slots, etc., also need to be set according to the specific situation.

Optional parameter description:

ParameterMeaning DescriptionParameter Filling Format
--job.checkpoint_modeData synchronization mode, the default is EXACTLY_ONCE--job.checkpoint_mode AT_LEAST_ONCE
--job.checkpoint_intervalCheckpoint storage interval, in ms, default is 10 minutes--job.checkpoint_interval 1200000

For MySQL, the following additional parameters need to be configured

ParameterRequiredMeaning DescriptionParameter Filling Format
--source_db.exclude_tablesoptionalA list of data table names that do not need to be synchronized, separated by commas, the default is empty--source_db.exclude_tables test_1,test_2
--server_time_zone=Asia/ShanghaioptionalMySQL server time zone, Flink side defaults to "Asia/Shanghai"Refer to JDK ZoneID 文档

Synchronous mysql job example For Mysql configuration, please refer to https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc.html

./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.5.0-flink-1.17.jar \
--source_db.db_name "testDB" \
--source_db.user "root" \
--source.parallelism 1 \
--source_db.db_type "mysql" \
--source_db.password "123456" \
--source_db.host "172.17.0.2" \
--source_db.port 3306 \
--sink.parallelism 1 \
--server_time_zone=Asia/Shanghai
--warehouse_path s3://bucket/lakesoul/flink/data \
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints

For Oracle, the following additional parameters need to be configured

ParameterRequiredMeaning DescriptionParameter Filling Format
--source_db.schemaListrequireThe list schema of oracle database--source_db.schemaList schema1,schema2
--source_db.schema_tablesrequiretable name,Use commas to separate multiple tables--source_db.schema_tables schema.table1,schema.table2
--server_time_zone=Asia/ShanghaioptionalOracle server time zone, Flink side defaults to "Asia/Shanghai"Refer to JDK ZoneID 文档
--source_db.splitSizeoptionalThe split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.the default is 1024.--source_db.splitSize 10000

Synchronous oracle job example For oracle configuration,please refer to https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html

./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.5.0-flink-1.17.jar \
--source_db.db_type oracle \
--source_db.db_name "testDB" \
--source_db.user "FLINKUSER" \
--source.parallelism 1 \
--sink.parallelism 1 \
--source_db.password "flinkpw" \
--source_db.host "172.17.0.2" \
--source_db.port 1521 \
--source_db.splitSize 10000 \
--source_db.schemaList "FLINKUSER" \
--source_db.schema_tables "FLINKUSER.T1" \
--job.checkpoint_interval 1000 \
--server_time_zone=Asia/Shanghai
--warehouse_path s3://bucket/lakesoul/flink/data \
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints

For Postgresql, the following additional parameters need to be configured

ParameterRequiredMeaning DescriptionParameter Filling Format
--source_db.schemaListrequirethe schema list of postgres--source_db.schemaList schema1,schema2
--source_db.schema_tablesrequiretable name,Use commas to separate multiple tables--source_db.schema_tables schema.table1,schema.table2
--source_db.splitSizeoptionalThe split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.the default is 1024.--source_db.splitSize 10000
--pluginNameoptionalThe name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput. default is decoderbufs--pluginName pgoutput
--source_db.slot_namerequirepostgres slot name--source_db.slot_name flink

For Postgresql configuration,please refer to https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/postgres-cdc.html

./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.5.0-flink-1.17.jar \
--source_db.db_name "postgres" \
--source_db.user "postgres" \
--source.parallelism 1 \
--source_db.schemaList "public" \
--source_db.db_type "postgres" \
--source_db.password "123456" \
--source_db.host "172.17.0.2" \
--source_db.port 5433 \
--source_db.splitSize 2 \
--source_db.schema_tables "public.t1" \
--source_db.slot_name flink \
--pluginName "pgoutput" \
--sink.parallelism 1 \
--job.checkpoint_interval 1000 \
--warehouse_path s3://bucket/lakesoul/flink/data \
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints

In the initialization phase after the LakeSoul Flink job starts, it will first read all the tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, first determine whether it exists in LakeSoul. If it does not exist, a LakeSoul table is automatically created, and its schema is consistent with the corresponding table in MySQL.

After initialization, the CDC Stream of all tables will be read and written to the corresponding LakeSoul tables in Upsert mode.

If a DDL Schema change occurs to a MySQL table during synchronization, the change will also be applied to the corresponding LakeSoul table.

LakeSoul Flink CDC Sink automatically saves the relevant state during job running, and can restore and rewrite the state when a Flink job fails, so data will not be lost.

When LakeSoul writes, write idempotency is guaranteed in two parts:

  1. When the stage file is Commit, it is consistent with the Flink File Sink. The atomicity of the file system rename operation is used to ensure that the staging file is written to the final path. Because rename is atomic, no duplicate writes or misses occur after a Failover.
  2. When the LakeSoul metadata is submitted, the file path will be recorded first, and when the snapshot is updated, the file will be marked as submitted through a transaction. After Failover, by judging whether a file has been submitted, the idempotency of submission can be guaranteed.

In summary, LakeSoul Flink CDC Sink ensures that data is not lost through state recovery, and ensures that data is not duplicated by committing with idempotency, which achieves an exactly once semantic guarantee.

Type mapping relationship between MySQL and LakeSoul

Since MySQL, Spark, Parquet and other data types are not exactly the same, LakeSoul makes the following mapping relationships during synchronization (types that are not in the table are currently not supported):

MySQL TypeSpark Type
BOOLEAN, BOOLorg.apache.spark.sql.types.DataTypes.BooleanType
BIT(1)org.apache.spark.sql.types.DataTypes.BooleanType
BIT(>1)org.apache.spark.sql.types.DataTypes.BinaryType
TINYINTorg.apache.spark.sql.types.DataTypes.IntegerType
SMALLINT[(M)]org.apache.spark.sql.types.DataTypes.IntegerType
MEDIUMINT[(M)]org.apache.spark.sql.types.DataTypes.IntegerType
INT, INTEGER[(M)]org.apache.spark.sql.types.DataTypes.IntegerType
BIGINT[(M)]org.apache.spark.sql.types.DataTypes.LongType
REAL[(M,D)]org.apache.spark.sql.types.DataTypes.FloatType
FLOAT[(M,D)]org.apache.spark.sql.types.DataTypes.DoubleType
DOUBLE[(M,D)]org.apache.spark.sql.types.DataTypes.DoubleType
CHAR(M)]org.apache.spark.sql.types.DataTypes.StringType
VARCHAR(M)]org.apache.spark.sql.types.DataTypes.StringType
BINARY(M)]org.apache.spark.sql.types.DataTypes.BinaryType
VARBINARY(M)]org.apache.spark.sql.types.DataTypes.BinaryType
TINYBLOBorg.apache.spark.sql.types.DataTypes.BinaryType
TINYTEXTorg.apache.spark.sql.types.DataTypes.StringType
BLOBorg.apache.spark.sql.types.DataTypes.BinaryType
TEXTorg.apache.spark.sql.types.DataTypes.StringType
MEDIUMBLOBorg.apache.spark.sql.types.DataTypes.BinaryType
MEDIUMTEXTorg.apache.spark.sql.types.DataTypes.StringType
LONGBLOBorg.apache.spark.sql.types.DataTypes.BinaryType
LONGTEXTorg.apache.spark.sql.types.DataTypes.StringType
JSONorg.apache.spark.sql.types.DataTypes.StringType
ENUMorg.apache.spark.sql.types.DataTypes.StringType
SETorg.apache.spark.sql.types.DataTypes.StringType
YEAR[(2|4)]org.apache.spark.sql.types.DataTypes.IntegerType
TIMESTAMP[(M)]org.apache.spark.sql.types.DataTypes.TimestampType
DATEorg.apache.spark.sql.types.DataTypes.DateType
TIME[(M)]org.apache.spark.sql.types.DataTypes.LongType
DATETIME[(M)]org.apache.spark.sql.types.DataTypes.TimestampType
NUMERIC[(M[,D])],DECIMAL[(M[,D])]if decimal.handling.mode=precise
org.apache.spark.sql.types.DecimalType(M,D)
if decimal.handling.mode=string
org.apache.spark.sql.types.DataTypes.StringType
if decimal.handling.mode=doulbe
org.apache.spark.sql.types.DataTypes.DoubleType
GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION, POINTNot currently supported

Types in Spark, type names in Spark SQL, you can find the corresponding relationship in the Spark Data Types document.

Data type matching

Type mapping relationship between Postgres and LakeSoul

SMALLINT INT2 SMALLSERIALSERIAL2org.apache.spark.sql.types.DataTypes.IntegerType
INTEGER
SERIAL
org.apache.spark.sql.types.DataTypes.IntegerType
BIGINT
BIGSERIAL
org.apache.spark.sql.types.DataTypes.LongType
REAL
FLOAT4
FLOAT8
DOUBLE
PRECISION
org.apache.spark.sql.types.DataTypes.DoubleType
NUMERIC(p, s)
DECIMAL(p, s)
if decimal.handling.mode=precise
  org.apache.spark.sql.types.DecimalType(M,D)
if decimal.handling.mode=string
 org.apache.spark.sql.types.DataTypes.StringType
if decimal.handling.mode=doulbe
 org.apache.spark.sql.types.DataTypes.DoubleType
BOOLEANorg.apache.spark.sql.types.DataTypes.BooleanType
DATEorg.apache.spark.sql.types.DataTypes.DateType
TIME [(p)][WITHOUT TIMEZONE]org.apache.spark.sql.types.DataTypes.LongType
TIMESTAMP [(p)][WITHOUT TIMEZONE]org.apache.spark.sql.types.DataTypes.TimestampType
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
org.apache.spark.sql.types.DataTypes.StringType
BYTEAorg.apache.spark.sql.types.DataTypes.BinaryType

Type mapping relationship between Oracle and LakeSoul

SMALLINT INT2 SMALLSERIALSERIAL2org.apache.spark.sql.types.DataTypes.IntegerType
NUMBER(p, s )if decimal.handling.mode=precise
 org.apache.spark.sql.types.DecimalType(M,D)
if decimal.handling.mode=string
 org.apache.spark.sql.types.DataTypes.StringType
if decimal.handling.mode=doulbe
 org.apache.spark.sql.types.DataTypes.DoubleType
FLOAT
BINARY_FLOAT
org.apache.spark.sql.types.DataTypes.DoubleType
DOUBLE PRECISION
BINARY_DOUBLE
org.apache.spark.sql.types.DataTypes.DoubleType
NUMBER(1)org.apache.spark.sql.types.DataTypes.BooleanType
DATEorg.apache.spark.sql.types.DataTypes.DateType
TIMESTAMP [(p)] WITH TIME ZONEorg.apache.spark.sql.types.DataTypes.TimestampType
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
SYS.XMLTYPE
org.apache.spark.sql.types.DataTypes.StringType
BLOBorg.apache.spark.sql.types.DataTypes.BinaryType

Precautions

  1. A table in MySQL must have a primary key, and tables without a primary key are currently not supported;
  2. The DDL change currently supports adding a column at the end, or deleting a column in the middle; the default value of a new column currently only supports null, and LakeSoul will automatically add a null value to the column when reading old data; the deleted column , LakeSoul will automatically filter this column when reading;
  3. The TIME type in MySQL corresponds to the LongType type in LakeSoul, as there is no TIME data type in Spark and Debezium resolves the TIME type to the current value in microseconds from 00:00:00. Therefore, this is consistent with Debezium;
  4. The TIMESTAMP and DATETIME types in MySQL are stored as UTC TIME ZOME values in LakeSoul to avoid time zone resolution issues; When reading, you just need to specify the time zone and it can be parsed according to the specified time zone. So it is necessary to correctly fill in the server_time_zone parameter when starting the FLINK CDC task.
  5. Postgres needs to set wal_level = logical
  6. In order to obtain complete Update event information in Postgres, you need to execute: alter table tablename replica identity full.
  7. Oracle needs to enable incremental logging for synchronized tables: ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  8. When transferring tables from Oracle to lakesoul, avoid using schema.* to transfer multiple tables.